package kafka_go

import (
	sarama "gitee.com/tym_hmm/go-kafa-shopify-sarama"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

/**
生产者连接池
*/
type productClientPool struct {
	pool  map[int64]*containerProduct
	rLock sync.RWMutex
}

func newProductClientPool() *productClientPool {
	return &productClientPool{}
}

/**
获取客户端
*/
func (this *productClientPool) GetClient(buildName string) (int32, sarama.SyncProducer, error) {
	name := strings.TrimSpace(buildName)
	if len(name) == 0 {
		return 0, nil, KAFKA_PRODUCT_ERROR_BUILD_NAME_EMPTY
	}
	buildHash := hashCode(name)
	this.rLock.RLock()
	defer this.rLock.RUnlock()
	if clients, ok := this.pool[buildHash]; !ok {
		return 0, nil, KAFKA_PRODUCT_ERROR_BUILD_CLIENT_EMPTY
	} else {
		_index, _c := clients.GetClient()
		return _index, _c, nil
	}
}

//func (this *pro)()  {
//
//}

/**
批量关闭
*/
func (this *productClientPool) closeClientBatch(build BuildProductApi) {
	if len(build.GetName()) == 0 {
		return
	}
	this.rLock.Lock()
	defer this.rLock.Unlock()
	if val, ok := this.pool[build.GetHashCode()]; ok {
		if val != nil && val.getClientList() != nil {
			clientList := val.getClientList()
			for _, vc := range clientList {
				_ = vc.Close()
			}
			delete(this.pool, build.GetHashCode())
		}
	}
}

func (this *productClientPool) addClient(builds BuildProductApi) error {
	if len(builds.GetName()) == 0 {
		return KAFKA_PRODUCT_ERROR_BUILD_NAME_EMPTY
	}
	this.rLock.Lock()
	defer this.rLock.Unlock()
	if this.pool == nil {
		this.pool = make(map[int64]*containerProduct)
	}
	if _, ok := this.pool[builds.GetHashCode()]; !ok {
		this.pool[builds.GetHashCode()] = newContainerProduct(builds)
	}
	connectionLen := builds.GetMaxConnection()
	if connectionLen <= 0 {
		connectionLen = 1
	}
	//_, cancel := context.WithCancel(context.Background())
	var errorChannel = make(chan error)
	var readyChannel = make(chan int)
	var currentTotalNumber int32 = 0
	var i int32 = 0
	for ; i < connectionLen; i++ {
		_i := i
		go func(index int32, len int32, build BuildProductApi) {
			if build.GetTransactional() {

			} else {

			}
			client, err := this.clientConnection(build)
			if err != nil {
				errorChannel <- err
				return
			}
			err = this.pool[build.GetHashCode()].addClient(client)
			if err != nil {
				errorChannel <- err
				return
			}
			readyChannel <- 1
		}(_i, connectionLen, builds)
	}

	for {
		select {
		//如果发生错误
		case err := <-errorChannel:
			this.closeClientBatch(builds)
			return err
		case <-readyChannel:
			atomic.AddInt32(&currentTotalNumber, 1)
			//log.Debug("connection num:%d", currentTotalNumber)
			if currentTotalNumber == connectionLen {
				return nil
			}
		}
	}
}

func (this *productClientPool) clientConnection(build BuildProductApi) (sarama.SyncProducer, error) {
	//log.Debug("%s config:%+v", build.GetName(), build.GetConfig())
	build.GetConfig().Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
		log.InfoNoFileInfo("disconnection builder %s reconnects [%d]", build.GetName(), retries)
		return time.Second
	}

	client, err := sarama.NewSyncProducer(build.GetAddrSlice(), build.GetConfig())
	if err != nil {
		return nil, err
	}
	return client, nil

}
